#!/usr/bin/python
# -*- coding: utf-8 -*-

# This file is part of Cockpit.
#
# Copyright (C) 2015 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

import parent
from testlib import *

FP_MD5 = "93:40:9e:67:82:78:a8:99:89:39:d5:ba:e0:50:70:e1"
FP_SHA256 = "SHA256:SRvBhCmkCEVnJ6ascVH0AoVEbS3nPbowZkNevJnXtgw"
KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDG4iipTovcMg0xn+089QLNKVGpP2Pgq2duxHgAXre2XgA3dZL+kooioGFwBQSEjbWssKy82hKIN/W82/lQtL6krf7JQWnT3LZwD5DPsvHFKhOLghbiFzSI0uEL4NFFcZOMo5tGLrM5LsZsaIkkv5QkAE0tHIyeYinK6dQ2d8ZsfmgqxHDUQUWnz1T75X9fWQsUugSWI+8xAe0cfa4qZRz/IC+K7DEB3x4Ot5pl8FBuydJj/gb+Lwo2Vs27/d87W/0KHCqOHNwaVC8RBb1WcmXRDDetLGH1A9m5x7Ip/KU/cyvWWxw8S4VKZkTIcrGUhFYJDnjtE3Axz+D7agtps41t test-name"

class TestKeys(MachineCase):
    def testAuthorizedKeys(self):
        m = self.machine
        b = self.browser


        # HACK: Logging in as "user" will try to write to
        # /var/lib/cockpit/, but this will fail.
        # https://github.com/cockpit-project/cockpit/issues/1248
        #
        self.allow_journal_messages(".*Failed to create file '/var/lib/cockpit/.*': Permission denied")

        # Create a user without any role
        m.execute("useradd user -s /bin/bash -m -c 'User' || true")
        m.execute("echo user:foobar | chpasswd")

        m.start_cockpit()

        def login(user, password, user_name):
            b.open("/system")
            b.wait_visible("#login")
            b.set_val("#login-user-input", user)
            b.set_val("#login-password-input", password)
            b.set_checked("#authorized-input", True)
            b.click('#login-button')
            b.expect_load()
            b.enter_page("/system")
            b.switch_to_top()
            b.wait_text('#content-user-name', user_name)

            b.click('#navbar-dropdown')
            b.wait_visible('#go-account')
            b.click('#go-account')
            b.enter_page("/users")
            b.wait_text("#account-user-name", user);

        def add_key(key, fp_md5, fp_sh256, comment):
            b.click('#authorized-key-add')
            b.wait_popup("add-authorized-key-dialog")
            b.wait_val("#authorized-keys-text", "")
            b.set_val("#authorized-keys-text", key)
            b.click("#add-authorized-key")
            b.wait_popdown("add-authorized-key-dialog")

            # Debian-8 doesn't output comments
            if self.machine.image != "debian-8":
                b.wait_in_text("#account-authorized-keys-list", comment)

            b.wait_not_in_text("#account-authorized-keys-list", "no authorized public keys")
            text = b.text("#account-authorized-keys-list")
            if ("SHA256" in text):
                self.assertIn(fp_sh256, text)
            else:
                self.assertIn(fp_md5, text)

        # no keys
        login("user", "foobar", "User")
        b.wait_in_text("#account-authorized-keys-list div.list-group-item:first-child", "no authorized public keys")
        b.wait_js_func("ph_count_check", "#account-authorized-keys-list div.list-group-item",  1);

        # add bad
        b.click('#authorized-key-add')
        b.wait_popup("add-authorized-key-dialog")
        b.wait_val("#authorized-keys-text", "")
        b.set_val("#authorized-keys-text", "bad")
        b.click("#add-authorized-key")
        b.wait_present(".dialog-error")
        b.click("#cancel-authorized-key")

        # add good
        add_key(KEY, FP_MD5, FP_SHA256, "test-name")

        # Try see admin
        b.go("#/admin")
        b.wait_visible("#account")
        b.wait_text("#account-user-name", "admin")

        # Not allowed, except on Ubuntu, where we can find out that ~/.ssh doesn't exist, which is shown as "no keys".
        if not "ubuntu" in m.image and not "debian" in m.image:
            b.wait_in_text("#account-authorized-keys-list div.list-group-item:first-child", "You do not have permission")
            b.wait_js_func("ph_count_check", "#account-authorized-keys-list div.list-group-item",  1);

        b.logout()

        # delete whole ssh to start fresh
        m.execute("rm -rf /home/user/.ssh")
        self.assertNotIn(".ssh", m.execute("ls /home/user"))

        # Log in as admin
        login("admin", "foobar", "Administrator")
        b.go("#/user")
        b.wait_in_text("#account-authorized-keys-list div.list-group-item:first-child", "no authorized public keys")
        b.wait_js_func("ph_count_check", "#account-authorized-keys-list div.list-group-item",  1);

        # Adding keys sets permissions properly
        b.wait_text("#account-user-name", "user")
        add_key(KEY, FP_MD5, FP_SHA256, "test-name")
        perms = m.execute("getfacl -a /home/user/.ssh")
        self.assertIn("owner: user", perms)

        perms = m.execute("getfacl -a /home/user/.ssh/authorized_keys")
        self.assertIn("owner: user", perms)
        self.assertIn("user::rw-", perms)
        self.assertIn("group::---", perms)
        self.assertIn("other::---", perms)

        # Add invalid key directly
        m.execute("(echo '' && echo 'bad') >> /home/user/.ssh/authorized_keys")
        b.wait_in_text("#account-authorized-keys-list div.list-group-item:last-child", "Invalid key")
        b.call_js_func("ph_count_check", "#account-authorized-keys-list div.list-group-item",  2)

        # Removing the key
        b.click("#account-authorized-keys-list div.list-group-item:last-child button");
        b.wait_not_in_text("#account-authorized-keys-list div.list-group-item:last-child", "Invalid key")
        b.wait_js_func("ph_count_check", "#account-authorized-keys-list div.list-group-item",  1);
        data = m.execute("cat /home/user/.ssh/authorized_keys")
        self.assertEqual(data, KEY+'\n')
        b.logout()

        # User can still see their keys
        login("user", "foobar", "User")
        if m.image == "debian-8":
            b.wait_in_text("#account-authorized-keys-list div.list-group-item:first-child", "rsa")
        else:
            b.wait_in_text("#account-authorized-keys-list div.list-group-item:first-child", "test-name")

        b.click("#account-authorized-keys-list div.list-group-item:first-child button");
        b.wait_in_text("#account-authorized-keys-list div.list-group-item:first-child", "no authorized public keys")
        b.wait_js_func("ph_count_check", "#account-authorized-keys-list div.list-group-item",  1);

        self.allow_restart_journal_messages()
        self.allow_authorize_journal_messages()
        self.allow_journal_messages('authorized_keys is not a public key file.')
        self.allow_journal_messages('Missing callback called fullpath = /home/user/.ssh/authorized_keys')
        self.allow_journal_messages('')

    def testPrivateKeys(self):
        b = self.browser
        m = self.machine

        def list_keys():
            return b.eval_js("cockpit.spawn([ '/bin/sh', '-c', 'ssh-add -l || true' ])")

        # Operating systems where auto loading doesn't work
        auto_load = "atomic" not in m.image

        # Put all the keys in place
        m.execute("mkdir -p /home/admin/.ssh")
        m.upload([
            "verify/files/ssh/id_rsa",
            "verify/files/ssh/id_dsa",
            "verify/files/ssh/id_rsa.pub",
            "verify/files/ssh/id_dsa.pub"
        ], "/home/admin/.ssh/")
        m.execute("chmod 600 /home/admin/.ssh/*")
        m.execute("chown -R admin:admin /home/admin/.ssh")

        self.login_and_go()

        id_rsa = "2048 SHA256:SRvBhCmkCEVnJ6ascVH0AoVEbS3nPbowZkNevJnXtgw"
        old_rsa = "2048 93:40:9e:67:82:78:a8:99:89:39:d5:ba:e0:50:70:e1"
        id_dsa = "1024 SHA256:x6S6fxMuEyqhpwNRAIK7ms6bZDY6xK9wzdDr2kCaWVY"
        old_dsa = "1024 d4:55:41:20:f1:0a:e0:52:15:fc:fc:f0:63:22:1f:76"

        keys = list_keys()
        if auto_load:
            if "SHA256" in keys:
                self.assertIn(id_rsa, keys)
            else:
                self.assertIn(old_rsa, keys)
        self.assertNotIn(id_dsa, keys)
        self.assertNotIn(old_dsa, keys)

        b.click("#navbar-dropdown")
        b.click("#credentials-item")

        # Check the key display
        b.wait_present("tbody[data-name=id_rsa]")

        # Automatically loaded
        if auto_load:
            assert b.is_present("tbody[data-name=id_rsa] .btn.active:contains('On')")
        b.click("tbody[data-name=id_rsa] tr.listing-ct-item")
        assert b.is_visible("tbody[data-name=id_rsa] .listing-ct-body")
        assert b.is_visible("tbody[data-name=id_rsa] ul")
        assert b.is_present("tbody[data-name=id_rsa] li.active:contains('Details')")
        self.assertEqual(b.text("tbody[data-name=id_rsa] .credential-comment"), "test@test")
        self.assertEqual(b.text("tbody[data-name=id_rsa] .credential-type"), "RSA")
        text = b.text("tbody[data-name=id_rsa] .credential-fingerprint");
        if "SHA256" in text:
            self.assertEqual(text, id_rsa[5:])
        else:
            self.assertEqual(text, old_rsa[5:])

        b.click("tbody[data-name=id_rsa] li:contains('Public Key') a")
        assert b.is_visible("tbody[data-name=id_rsa] textarea")
        self.assertEqual(b.text("tbody[data-name=id_rsa] textarea"),
                         "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDG4iipTovcMg0xn+089QLNKVGpP" +
                         "2Pgq2duxHgAXre2XgA3dZL+kooioGFwBQSEjbWssKy82hKIN/W82/lQtL6krf7JQW" +
                         "nT3LZwD5DPsvHFKhOLghbiFzSI0uEL4NFFcZOMo5tGLrM5LsZsaIkkv5QkAE0tHIy" +
                         "eYinK6dQ2d8ZsfmgqxHDUQUWnz1T75X9fWQsUugSWI+8xAe0cfa4qZRz/IC+K7DEB" +
                         "3x4Ot5pl8FBuydJj/gb+Lwo2Vs27/d87W/0KHCqOHNwaVC8RBb1WcmXRDDetLGH1A" +
                         "9m5x7Ip/KU/cyvWWxw8S4VKZkTIcrGUhFYJDnjtE3Axz+D7agtps41t test@test\n")
        b.click("tbody[data-name=id_rsa] tr.listing-ct-item")
        assert not b.is_visible("tbody[data-name=id_rsa] .listing-ct-body")

        # Load the id_dsa key
        b.wait_present("tbody[data-name=id_dsa]")
        assert b.is_visible("tbody[data-name=id_dsa] .btn.active:contains('Off')")
        b.click("tbody[data-name=id_dsa] .btn:contains('On')")
        assert b.is_present("tbody[data-name=id_dsa] .credential-unlock")
        b.set_val("tbody[data-name=id_dsa] .credential-password", "badbad")
        b.click("tbody[data-name=id_dsa] .credential-unlock .btn")
        b.wait_present("tbody[data-name=id_dsa][data-loaded=1]")
        assert b.is_present("tbody[data-name=id_dsa] .btn.active:contains('On')")

        # Both keys are now loaded
        keys = list_keys()
        if "SHA256" in keys:
            if auto_load:
                self.assertIn(id_rsa, keys)
            self.assertIn(id_dsa, keys)
        else:
            if auto_load:
                self.assertIn(old_rsa, keys)
            self.assertIn(old_dsa, keys)

        # Unload the RSA key
        b.click("tbody[data-name=id_rsa] .btn:contains('Off')")
        b.wait_present("tbody[data-name=id_rsa][data-loaded=0]")

        # Only DSA keys now loaded
        keys = list_keys()
        if "SHA256" in keys:
            self.assertIn(id_dsa, keys)
        else:
            self.assertIn(old_dsa, keys)
        self.assertNotIn(id_rsa, keys)
        self.assertNotIn(old_rsa, keys)

        # Change password of DSA key
        b.click("tbody[data-name=id_dsa] li:contains('Password') a")
        b.set_val("tbody[data-name=id_dsa] .credential-old", "badbad")
        b.set_val("tbody[data-name=id_dsa] .credential-new", "foobar")
        b.set_val("tbody[data-name=id_dsa] .credential-two", "foobar")
        b.click("tbody[data-name=id_dsa] .credential-change")
        b.wait_present("tbody[data-name=id_dsa] li.active:contains('Details')")

        # Log off and log back in, and we should have both loaded automatically
        if auto_load:
            b.logout()
            b.login_and_go()
            keys = list_keys()
            if "SHA256" in keys:
                self.assertIn(id_rsa, keys)
                self.assertIn(id_dsa, keys)
            else:
                self.assertIn(old_rsa, keys)
                self.assertIn(old_dsa, keys)

if __name__ == '__main__':
    test_main()
